import { Tabs } from "nextra/components";
import UniversalTabs from "../../../../components/UniversalTabs";

# The CANCEL_IN_PROGRESS Concurrency Limit Strategy in Hatchet

Hatchet's `CANCEL_IN_PROGRESS` concurrency limit strategy is a powerful tool for managing resource contention in your workflows. This strategy allows you to cancel currently running workflow instances to free up slots for new instances when the concurrency limit is reached.

## How it works

When a workflow run finishes and the concurrency limit is reached for the workflow version and group key, the CANCEL_IN_PROGRESS strategy will:

1. Fetch all currently running workflow runs for the given workflow version and concurrency key.
2. Fetch the queued workflow runs for the same workflow version and group key, up to the maximum number of concurrent runs allowed.
3. Cancel the oldest running workflow runs to make room for the queued runs, ensuring that there is space for all the queued runs to start executing.
4. Start executing the queued workflow runs in the freed-up slots.

This strategy ensures that queued workflow runs can start executing as soon as possible, even if the concurrency limit is reached, by canceling older running instances.

## When to use CANCEL_IN_PROGRESS

The `CANCEL_IN_PROGRESS` strategy is particularly useful in scenarios where:

- You have long-running workflow instances that may become stale or irrelevant if newer instances are triggered.
- You want to prioritize processing the most recent data or events, even if it means canceling older workflow instances.
- You have resource-intensive workflows where it's more efficient to cancel an in-progress instance and start a new one than to wait for the old instance to complete.
- Your user UI allows for multiple inputs, but only the most recent is relevant (i.e. chat messages, form submissions, etc.).

However, it's important to note that canceling a workflow instance may leave your system in an inconsistent state if the canceled instance was in the middle of updating a resource. Make sure to design your workflows to handle [cancellation gracefully](../cancellation) and ensure data consistency.

## How to use CANCEL_IN_PROGRESS

To use the `CANCEL_IN_PROGRESS` concurrency limit strategy, define a `concurrency` configuration in your workflow definition:

<Tabs items={["CEL Expression (Recommended)", "Key Function (Advanced)"]}>
  <Tabs.Tab>

CEL expressions are evaluated on the Hatchet engine (not in your worker) and can be used to extract values from the workflow input or [additional metadata](/features/additional-metadata).

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
from hatchet_sdk import ConcurrencyLimitStrategy, ConcurrencyExpression

@hatchet.workflow(
  on_events=["concurrency-test"],
  concurrency=ConcurrencyExpression(
    expression="input.group",
    max_runs=5,
    limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
  )
)
class ConcurrencyDemoWorkflow:
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
export const concurrencyDemoWorkflow: Workflow = {
  id: "my-workflow",
  description: "My workflow with concurrency control",
  on: {
    event: "concurrency-test",
  },
  steps: [
    // ...
  ],
  concurrency: {
    name: "my-workflow-concurrency",
    maxRuns: 10,
    limitStrategy: ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
    expression: "input.group",
  },
};
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
import (
  "github.com/hatchet-dev/hatchet/pkg/client/types"
  "github.com/hatchet-dev/hatchet/pkg/worker"
)

type MyUser struct {
    UserId string `json:"user_id"`
}

err = w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name: "concurrency-limit-per-user",
        On: worker.Events("concurrency-test-event"),
        Description: "This limits concurrency to 1 run at a time per user.",
        Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(10).LimitStrategy(types.CancelInProgress),
        Steps: []*worker.WorkflowStep{
            // your steps here...
        },
    },
)

```

  </Tabs.Tab>
</UniversalTabs>
In this example:

- `maxRuns` sets the maximum number of concurrent instances allowed for this workflow.
- `limitStrategy` is set to `CANCEL_IN_PROGRESS`, indicating that when the concurrency limit is reached, currently running instances should be canceled to make room for new ones.
- `expression` (recommended) is a CEL expression that is used to determine the concurrency key. In this example, the expression is `input.userId`, which means that instances are grouped by `userId`.

</Tabs.Tab>
  <Tabs.Tab>
In advanced use cases, you may want to use a custom function to determine the concurrency key. Concurrency key functions are evaluated on your Hatchet workers and can be used to extract values from the workflow input, context, and other sources (i.e. database, external API, etc.).
<UniversalTabs items={['Python', 'Typescript', 'Go']}>

  <Tabs.Tab>

```python
from hatchet_sdk import ConcurrencyLimitStrategy

@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
    @hatchet.concurrency(max_runs=10, limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS)
    def concurrency(self, context) -> str:
        return context.workflow_input()["user_id"]

    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
export const concurrencyDemoWorkflow: Workflow = {
  id: "my-workflow",
  description: "My workflow with concurrency control",
  on: {
    event: "concurrency-test",
  },
  steps: [
    // ...
  ],
  concurrency: {
    name: "my-workflow-concurrency",
    maxRuns: 10,
    limitStrategy: ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
    key: (ctx) => ctx.workflowInput().userId,
  },
};
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
import (
  "github.com/hatchet-dev/hatchet/pkg/client/types"
  "github.com/hatchet-dev/hatchet/pkg/worker"
)

type MyUser struct {
    UserId string `json:"user_id"`
}

func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
  event := &MyUser{}
  err := ctx.WorkflowInput(event)

    if err != nil {
        return "", err
    }

    return event.UserId, nil

}

err = w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name: "concurrency-limit-per-user",
        On: worker.Events("concurrency-test-event"),
        Description: "This limits concurrency to 1 run at a time per user.",
        Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(10).LimitStrategy(types.CancelInProgress),
        Steps: []*worker.WorkflowStep{
            // your steps here...
        },
    },
)

```

  </Tabs.Tab>
</UniversalTabs>
In this example:

- `maxRuns` sets the maximum number of concurrent instances allowed for this workflow.
- `limitStrategy` is set to `CANCEL_IN_PROGRESS`, indicating that when the concurrency limit is reached, currently running instances should be canceled to make room for new ones.
- `key` is a function that takes the workflow context and returns a string key. This key is used to group workflow instances for the purpose of concurrency limiting and determining which instances to cancel. In this example, instances are grouped by `userId`, so when a new instance is triggered for a given user, any currently running instances for that same user will be canceled.

</Tabs.Tab>
</Tabs>

> NOTE: Only one of `key` or `expression` should be provided, not both.

With this configuration, Hatchet will automatically manage your workflow's concurrency, canceling in-progress instances as needed to ensure that new instances can start executing immediately.

Remember to design your workflows to handle cancellation gracefully, and consider using checkpointing or idempotency techniques to ensure data consistency in the face of canceled instances.
